You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/05/20 19:27:01 UTC
[05/12] GIRAPH-667: Decouple Vertex data and Computation,
make Computation and Combiner classes switchable (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
index 0039ad6..d210928 100644
--- a/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/MockUtils.java
@@ -22,12 +22,13 @@ import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.messages.ByteArrayMessagesPerVertexStore;
+import org.apache.giraph.graph.Computation;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.ArrayListEdges;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.partition.BasicPartitionOwner;
import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.edge.ArrayListEdges;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
@@ -55,7 +56,7 @@ public class MockUtils {
public static class MockedEnvironment<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable> {
- private final GraphState<I, V, E, M> graphState;
+ private final GraphState graphState;
private final Mapper.Context context;
private final Configuration conf;
private final WorkerClientRequestProcessor workerClientRequestProcessor;
@@ -101,14 +102,15 @@ public class MockUtils {
}
/**
- * prepare a vertex for use in a unit test by setting its internal state and injecting mocked
- * dependencies,
+ * prepare a vertex and computation for use in a unit test by setting its
+ * internal state and injecting mocked dependencies,
*
- * @param vertex
- * @param superstep the superstep to emulate
+ * @param vertex Vertex
* @param vertexId initial vertex id
* @param vertexValue initial vertex value
* @param isHalted initial halted state of the vertex
+ * @param computation Computation
+ * @param superstep Superstep
* @param <I> vertex id
* @param <V> vertex data
* @param <E> edge data
@@ -116,47 +118,42 @@ public class MockUtils {
* @return
* @throws Exception
*/
- public static <I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable>
- MockedEnvironment<I, V, E, M> prepareVertex(
- Vertex<I, V, E, M> vertex, long superstep, I vertexId,
- V vertexValue, boolean isHalted) throws Exception {
-
- MockedEnvironment<I, V, E, M> env =
- new MockedEnvironment<I, V, E, M>();
-
- Mockito.when(env.getGraphState().getSuperstep()).thenReturn(superstep);
- Mockito.when(env.getGraphState().getContext())
- .thenReturn(env.getContext());
- Mockito.when(env.getContext().getConfiguration())
- .thenReturn(env.getConfiguration());
- Mockito.when(env.getGraphState().getWorkerClientRequestProcessor())
- .thenReturn(env.getWorkerClientRequestProcessor());
-
- GiraphConfiguration giraphConf = new GiraphConfiguration();
- giraphConf.setVertexClass(vertex.getClass());
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf =
- new ImmutableClassesGiraphConfiguration<I, V, E, M>(giraphConf);
- vertex.setConf(conf);
- ArrayListEdges<I, E> edges = new ArrayListEdges<I, E>();
- edges.setConf((ImmutableClassesGiraphConfiguration<I, Writable, E,
- Writable>) conf);
- edges.initialize();
-
- ReflectionUtils.setField(vertex, "id", vertexId);
- ReflectionUtils.setField(vertex, "value", vertexValue);
- ReflectionUtils.setField(vertex, "edges", edges);
- ReflectionUtils.setField(vertex, "graphState", env.getGraphState());
- ReflectionUtils.setField(vertex, "halt", isHalted);
-
- return env;
+ public static <I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable>
+ MockedEnvironment<I, V, E, M> prepareVertexAndComputation(
+ Vertex<I, V, E> vertex, I vertexId, V vertexValue, boolean isHalted,
+ Computation<I, V, E, M, M> computation, long superstep) throws
+ Exception {
+ MockedEnvironment<I, V, E, M> env = new MockedEnvironment<I, V, E, M>();
+ Mockito.when(env.getGraphState().getSuperstep()).thenReturn(superstep);
+ Mockito.when(env.getGraphState().getContext())
+ .thenReturn(env.getContext());
+ Mockito.when(env.getContext().getConfiguration())
+ .thenReturn(env.getConfiguration());
+ computation.initialize(env.getGraphState(),
+ env.getWorkerClientRequestProcessor(), null, null, null);
+
+ GiraphConfiguration giraphConf = new GiraphConfiguration();
+ giraphConf.setComputationClass(computation.getClass());
+ giraphConf.setOutEdgesClass(ArrayListEdges.class);
+ ImmutableClassesGiraphConfiguration<I, V, E> conf =
+ new ImmutableClassesGiraphConfiguration<I, V, E>(giraphConf);
+ computation.setConf(conf);
+
+ vertex.setConf(conf);
+ vertex.initialize(vertexId, vertexValue);
+ if (isHalted) {
+ vertex.voteToHalt();
}
+ return env;
+ }
+
public static CentralizedServiceWorker<IntWritable, IntWritable,
- IntWritable, IntWritable> mockServiceGetVertexPartitionOwner(final int
+ IntWritable> mockServiceGetVertexPartitionOwner(final int
numOfPartitions) {
- CentralizedServiceWorker<IntWritable, IntWritable, IntWritable,
- IntWritable> service = Mockito.mock(CentralizedServiceWorker.class);
+ CentralizedServiceWorker<IntWritable, IntWritable, IntWritable> service =
+ Mockito.mock(CentralizedServiceWorker.class);
Answer<PartitionOwner> answer = new Answer<PartitionOwner>() {
@Override
public PartitionOwner answer(InvocationOnMock invocation) throws
@@ -170,10 +167,10 @@ public class MockUtils {
return service;
}
- public static ServerData<IntWritable, IntWritable, IntWritable, IntWritable>
+ public static ServerData<IntWritable, IntWritable, IntWritable>
createNewServerData(ImmutableClassesGiraphConfiguration conf,
Mapper.Context context) {
- return new ServerData<IntWritable, IntWritable, IntWritable, IntWritable>(
+ return new ServerData<IntWritable, IntWritable, IntWritable>(
Mockito.mock(CentralizedServiceWorker.class),
conf,
ByteArrayMessagesPerVertexStore.newFactory(
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/utils/NoOpComputation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/NoOpComputation.java b/giraph-core/src/test/java/org/apache/giraph/utils/NoOpComputation.java
new file mode 100644
index 0000000..630888f
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/NoOpComputation.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * Computation which does nothing, just halts, used for testing
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+public class NoOpComputation<I extends WritableComparable, V extends Writable,
+ E extends Writable, M extends Writable>
+ extends BasicComputation<I, V, E, M> {
+ @Override
+ public void compute(Vertex<I, V, E> vertex,
+ Iterable<M> messages) throws IOException {
+ vertex.voteToHalt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java b/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java
deleted file mode 100644
index c98d580..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/vertices/IntIntNullVertexDoNothing.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.vertices;
-
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-public class IntIntNullVertexDoNothing extends VertexDoNothing<IntWritable,
- IntWritable, NullWritable, NullWritable> {
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java
deleted file mode 100644
index 9060bc7..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/vertices/VertexCountEdges.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.vertices;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import java.io.IOException;
-
-public class VertexCountEdges extends Vertex<IntWritable, IntWritable,
- NullWritable, NullWritable> {
- @Override
- public void compute(Iterable<NullWritable> messages) throws IOException {
- setValue(new IntWritable(getNumEdges()));
- voteToHalt();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java b/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java
deleted file mode 100644
index fac3fce..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/vertices/VertexDoNothing.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.vertices;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.io.IOException;
-
-public class VertexDoNothing<I extends WritableComparable, V extends Writable,
- E extends Writable, M extends Writable> extends Vertex<I, V, E, M> {
- @Override
- public void compute(Iterable<M> messages) throws IOException {
- voteToHalt();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java b/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
index f9d5544..ed365b4 100644
--- a/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
+++ b/giraph-core/src/test/java/org/apache/giraph/yarn/TestYarnJob.java
@@ -30,6 +30,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.graph.BasicComputation;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.formats.GiraphFileInputFormat;
import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
@@ -61,11 +62,12 @@ public class TestYarnJob implements Watcher {
/**
* Simple No-Op vertex to test if we can run a quick Giraph job on YARN.
*/
- private static class DummyYarnVertex extends Vertex<IntWritable, IntWritable,
- NullWritable, IntWritable> {
+ private static class DummyYarnComputation extends BasicComputation<
+ IntWritable, IntWritable, NullWritable, IntWritable> {
@Override
- public void compute(Iterable<IntWritable> messages) throws IOException {
- voteToHalt();
+ public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
+ Iterable<IntWritable> messages) throws IOException {
+ vertex.voteToHalt();
}
}
@@ -203,7 +205,7 @@ public class TestYarnJob implements Watcher {
conf.setEventWaitMsecs(3 * 1000);
conf.setYarnLibJars(""); // no need
conf.setYarnTaskHeapMb(256); // small since no work to be done
- conf.setVertexClass(DummyYarnVertex.class);
+ conf.setComputationClass(DummyYarnComputation.class);
conf.setVertexInputFormatClass(IntIntNullTextInputFormat.class);
conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
conf.setNumComputeThreads(1);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
new file mode 100644
index 0000000..db527f2
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestComputation.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/** Computation which uses aggrergators. To be used for testing. */
+public class AggregatorsTestComputation extends
+ BasicComputation<LongWritable, DoubleWritable, FloatWritable,
+ DoubleWritable> {
+
+ /** Name of regular aggregator */
+ private static final String REGULAR_AGG = "regular";
+ /** Name of persistent aggregator */
+ private static final String PERSISTENT_AGG = "persistent";
+ /** Name of master overwriting aggregator */
+ private static final String MASTER_WRITE_AGG = "master";
+ /** Value which master compute will use */
+ private static final long MASTER_VALUE = 12345;
+ /** Prefix for name of aggregators in array */
+ private static final String ARRAY_PREFIX_AGG = "array";
+ /** Number of aggregators to use in array */
+ private static final int NUM_OF_AGGREGATORS_IN_ARRAY = 100;
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, DoubleWritable, FloatWritable> vertex,
+ Iterable<DoubleWritable> messages) throws IOException {
+ long superstep = getSuperstep();
+
+ LongWritable myValue = new LongWritable(1L << superstep);
+ aggregate(REGULAR_AGG, myValue);
+ aggregate(PERSISTENT_AGG, myValue);
+
+ long nv = getTotalNumVertices();
+ if (superstep > 0) {
+ assertEquals(nv * (1L << (superstep - 1)),
+ ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+ } else {
+ assertEquals(0,
+ ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+ }
+ assertEquals(nv * ((1L << superstep) - 1),
+ ((LongWritable) getAggregatedValue(PERSISTENT_AGG)).get());
+ assertEquals(MASTER_VALUE * (1L << superstep),
+ ((LongWritable) getAggregatedValue(MASTER_WRITE_AGG)).get());
+
+ for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
+ aggregate(ARRAY_PREFIX_AGG + i, new LongWritable((superstep + 1) * i));
+ assertEquals(superstep * getTotalNumVertices() * i,
+ ((LongWritable) getAggregatedValue(ARRAY_PREFIX_AGG + i)).get());
+ }
+
+ if (getSuperstep() == 10) {
+ vertex.voteToHalt();
+ }
+ }
+
+ /** Master compute which uses aggregators. To be used for testing. */
+ public static class AggregatorsTestMasterCompute extends
+ DefaultMasterCompute {
+ @Override
+ public void compute() {
+ long superstep = getSuperstep();
+
+ LongWritable myValue =
+ new LongWritable(MASTER_VALUE * (1L << superstep));
+ setAggregatedValue(MASTER_WRITE_AGG, myValue);
+
+ long nv = getTotalNumVertices();
+ if (superstep > 0) {
+ assertEquals(nv * (1L << (superstep - 1)),
+ ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+ } else {
+ assertEquals(0,
+ ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
+ }
+ assertEquals(nv * ((1L << superstep) - 1),
+ ((LongWritable) getAggregatedValue(PERSISTENT_AGG)).get());
+
+ for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
+ assertEquals(superstep * getTotalNumVertices() * i,
+ ((LongWritable) getAggregatedValue(ARRAY_PREFIX_AGG + i)).get());
+ }
+ }
+
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(REGULAR_AGG, LongSumAggregator.class);
+ registerPersistentAggregator(PERSISTENT_AGG,
+ LongSumAggregator.class);
+ registerAggregator(MASTER_WRITE_AGG, LongSumAggregator.class);
+
+ for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
+ registerAggregator(ARRAY_PREFIX_AGG + i, LongSumAggregator.class);
+ }
+ }
+ }
+
+ /**
+ * Throws exception if values are not equal.
+ *
+ * @param expected Expected value
+ * @param actual Actual value
+ */
+ private static void assertEquals(long expected, long actual) {
+ if (expected != actual) {
+ throw new RuntimeException("expected: " + expected +
+ ", actual: " + actual);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
deleted file mode 100644
index d08519b..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/AggregatorsTestVertex.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.aggregators.LongSumAggregator;
-import org.apache.giraph.master.DefaultMasterCompute;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-
-import java.io.IOException;
-
-/** Vertex which uses aggrergators. To be used for testing. */
-public class AggregatorsTestVertex extends
- Vertex<LongWritable, DoubleWritable, FloatWritable,
- DoubleWritable> {
-
- /** Name of regular aggregator */
- private static final String REGULAR_AGG = "regular";
- /** Name of persistent aggregator */
- private static final String PERSISTENT_AGG = "persistent";
- /** Name of master overwriting aggregator */
- private static final String MASTER_WRITE_AGG = "master";
- /** Value which master compute will use */
- private static final long MASTER_VALUE = 12345;
- /** Prefix for name of aggregators in array */
- private static final String ARRAY_PREFIX_AGG = "array";
- /** Number of aggregators to use in array */
- private static final int NUM_OF_AGGREGATORS_IN_ARRAY = 100;
-
- @Override
- public void compute(Iterable<DoubleWritable> messages) throws IOException {
- long superstep = getSuperstep();
-
- LongWritable myValue = new LongWritable(1L << superstep);
- aggregate(REGULAR_AGG, myValue);
- aggregate(PERSISTENT_AGG, myValue);
-
- long nv = getTotalNumVertices();
- if (superstep > 0) {
- assertEquals(nv * (1L << (superstep - 1)),
- ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
- } else {
- assertEquals(0,
- ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
- }
- assertEquals(nv * ((1L << superstep) - 1),
- ((LongWritable) getAggregatedValue(PERSISTENT_AGG)).get());
- assertEquals(MASTER_VALUE * (1L << superstep),
- ((LongWritable) getAggregatedValue(MASTER_WRITE_AGG)).get());
-
- for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
- aggregate(ARRAY_PREFIX_AGG + i, new LongWritable((superstep + 1) * i));
- assertEquals(superstep * getTotalNumVertices() * i,
- ((LongWritable) getAggregatedValue(ARRAY_PREFIX_AGG + i)).get());
- }
-
- if (getSuperstep() == 10) {
- voteToHalt();
- }
- }
-
- /** Master compute which uses aggregators. To be used for testing. */
- public static class AggregatorsTestMasterCompute extends
- DefaultMasterCompute {
- @Override
- public void compute() {
- long superstep = getSuperstep();
-
- LongWritable myValue =
- new LongWritable(MASTER_VALUE * (1L << superstep));
- setAggregatedValue(MASTER_WRITE_AGG, myValue);
-
- long nv = getTotalNumVertices();
- if (superstep > 0) {
- assertEquals(nv * (1L << (superstep - 1)),
- ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
- } else {
- assertEquals(0,
- ((LongWritable) getAggregatedValue(REGULAR_AGG)).get());
- }
- assertEquals(nv * ((1L << superstep) - 1),
- ((LongWritable) getAggregatedValue(PERSISTENT_AGG)).get());
-
- for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
- assertEquals(superstep * getTotalNumVertices() * i,
- ((LongWritable) getAggregatedValue(ARRAY_PREFIX_AGG + i)).get());
- }
- }
-
- @Override
- public void initialize() throws InstantiationException,
- IllegalAccessException {
- registerAggregator(REGULAR_AGG, LongSumAggregator.class);
- registerPersistentAggregator(PERSISTENT_AGG,
- LongSumAggregator.class);
- registerAggregator(MASTER_WRITE_AGG, LongSumAggregator.class);
-
- for (int i = 0; i < NUM_OF_AGGREGATORS_IN_ARRAY; i++) {
- registerAggregator(ARRAY_PREFIX_AGG + i, LongSumAggregator.class);
- }
- }
- }
-
- /**
- * Throws exception if values are not equal.
- *
- * @param expected Expected value
- * @param actual Actual value
- */
- private static void assertEquals(long expected, long actual) {
- if (expected != actual) {
- throw new RuntimeException("expected: " + expected +
- ", actual: " + actual);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsComputation.java
new file mode 100644
index 0000000..9b0cfe1
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsComputation.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+import java.io.IOException;
+
+/**
+ * Implementation of the HCC algorithm that identifies connected components and
+ * assigns each vertex its "component identifier" (the smallest vertex id
+ * in the component)
+ *
+ * The idea behind the algorithm is very simple: propagate the smallest
+ * vertex id along the edges to all vertices of a connected component. The
+ * number of supersteps necessary is equal to the length of the maximum
+ * diameter of all components + 1
+ *
+ * The original Hadoop-based variant of this algorithm was proposed by Kang,
+ * Charalampos, Tsourakakis and Faloutsos in
+ * "PEGASUS: Mining Peta-Scale Graphs", 2010
+ *
+ * http://www.cs.cmu.edu/~ukang/papers/PegasusKAIS.pdf
+ */
+@Algorithm(
+ name = "Connected components",
+ description = "Finds connected components of the graph"
+)
+public class ConnectedComponentsComputation extends
+ BasicComputation<IntWritable, IntWritable, NullWritable, IntWritable> {
+ /**
+ * Propagates the smallest vertex id to all neighbors. Will always choose to
+ * halt and only reactivate if a smaller id has been sent to it.
+ *
+ * @param vertex Vertex
+ * @param messages Iterator of messages from the previous superstep.
+ * @throws IOException
+ */
+ @Override
+ public void compute(
+ Vertex<IntWritable, IntWritable, NullWritable> vertex,
+ Iterable<IntWritable> messages) throws IOException {
+ int currentComponent = vertex.getValue().get();
+
+ // First superstep is special, because we can simply look at the neighbors
+ if (getSuperstep() == 0) {
+ for (Edge<IntWritable, NullWritable> edge : vertex.getEdges()) {
+ int neighbor = edge.getTargetVertexId().get();
+ if (neighbor < currentComponent) {
+ currentComponent = neighbor;
+ }
+ }
+ // Only need to send value if it is not the own id
+ if (currentComponent != vertex.getValue().get()) {
+ vertex.setValue(new IntWritable(currentComponent));
+ for (Edge<IntWritable, NullWritable> edge : vertex.getEdges()) {
+ IntWritable neighbor = edge.getTargetVertexId();
+ if (neighbor.get() > currentComponent) {
+ sendMessage(neighbor, vertex.getValue());
+ }
+ }
+ }
+
+ vertex.voteToHalt();
+ return;
+ }
+
+ boolean changed = false;
+ // did we get a smaller id ?
+ for (IntWritable message : messages) {
+ int candidateComponent = message.get();
+ if (candidateComponent < currentComponent) {
+ currentComponent = candidateComponent;
+ changed = true;
+ }
+ }
+
+ // propagate new component id to the neighbors
+ if (changed) {
+ vertex.setValue(new IntWritable(currentComponent));
+ sendMessageToAllEdges(vertex, vertex.getValue());
+ }
+ vertex.voteToHalt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
deleted file mode 100644
index dbeb6bf..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/ConnectedComponentsVertex.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-
-import java.io.IOException;
-
-/**
- * Implementation of the HCC algorithm that identifies connected components and
- * assigns each vertex its "component identifier" (the smallest vertex id
- * in the component)
- *
- * The idea behind the algorithm is very simple: propagate the smallest
- * vertex id along the edges to all vertices of a connected component. The
- * number of supersteps necessary is equal to the length of the maximum
- * diameter of all components + 1
- *
- * The original Hadoop-based variant of this algorithm was proposed by Kang,
- * Charalampos, Tsourakakis and Faloutsos in
- * "PEGASUS: Mining Peta-Scale Graphs", 2010
- *
- * http://www.cs.cmu.edu/~ukang/papers/PegasusKAIS.pdf
- */
-@Algorithm(
- name = "Connected components",
- description = "Finds connected components of the graph"
-)
-public class ConnectedComponentsVertex extends Vertex<IntWritable,
- IntWritable, NullWritable, IntWritable> {
- /**
- * Propagates the smallest vertex id to all neighbors. Will always choose to
- * halt and only reactivate if a smaller id has been sent to it.
- *
- * @param messages Iterator of messages from the previous superstep.
- * @throws IOException
- */
- @Override
- public void compute(Iterable<IntWritable> messages) throws IOException {
- int currentComponent = getValue().get();
-
- // First superstep is special, because we can simply look at the neighbors
- if (getSuperstep() == 0) {
- for (Edge<IntWritable, NullWritable> edge : getEdges()) {
- int neighbor = edge.getTargetVertexId().get();
- if (neighbor < currentComponent) {
- currentComponent = neighbor;
- }
- }
- // Only need to send value if it is not the own id
- if (currentComponent != getValue().get()) {
- setValue(new IntWritable(currentComponent));
- for (Edge<IntWritable, NullWritable> edge : getEdges()) {
- IntWritable neighbor = edge.getTargetVertexId();
- if (neighbor.get() > currentComponent) {
- sendMessage(neighbor, getValue());
- }
- }
- }
-
- voteToHalt();
- return;
- }
-
- boolean changed = false;
- // did we get a smaller id ?
- for (IntWritable message : messages) {
- int candidateComponent = message.get();
- if (candidateComponent < currentComponent) {
- currentComponent = candidateComponent;
- changed = true;
- }
- }
-
- // propagate new component id to the neighbors
- if (changed) {
- setValue(new IntWritable(currentComponent));
- sendMessageToAllEdges(getValue());
- }
- voteToHalt();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityComputation.java
new file mode 100644
index 0000000..12a58d5
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityComputation.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * User applications can subclass IdentityComputation, which
+ * simply prints the results that have been read for testing IO related
+ * jobs under any inputformat
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message value
+ */
+@SuppressWarnings("rawtypes")
+public class IdentityComputation<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ extends BasicComputation<I, V, E, M> {
+ @Override
+ public void compute(Vertex<I, V, E> vertex,
+ Iterable<M> messages) throws IOException {
+ vertex.voteToHalt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityVertex.java
deleted file mode 100644
index 30cca86..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/IdentityVertex.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * User applications can subclass IdentityVertex, which
- * simply prints the results that have been read for testing IO related
- * jobs under any inputformat
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-
-public abstract class IdentityVertex<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends Vertex<I, V, E, M> {
-
- @Override
- public void compute(Iterable<M> messages) {
- voteToHalt();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleDoubleTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleDoubleTextInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleDoubleTextInputFormat.java
index 62bea5a..f56b4f6 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleDoubleTextInputFormat.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleDoubleTextInputFormat.java
@@ -27,7 +27,6 @@ import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -44,10 +43,10 @@ public class LongDoubleDoubleTextInputFormat
extends TextVertexInputFormat<LongWritable, DoubleWritable,
DoubleWritable>
implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
- DoubleWritable, Writable> {
+ DoubleWritable> {
/** Configuration. */
private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
- DoubleWritable, Writable> conf;
+ DoubleWritable> conf;
@Override
public TextVertexReader createVertexReader(InputSplit split,
@@ -58,13 +57,13 @@ public class LongDoubleDoubleTextInputFormat
@Override
public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
- DoubleWritable, DoubleWritable, Writable> configuration) {
+ DoubleWritable, DoubleWritable> configuration) {
this.conf = configuration;
}
@Override
public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
- DoubleWritable, Writable> getConf() {
+ DoubleWritable> getConf() {
return conf;
}
@@ -79,9 +78,9 @@ public class LongDoubleDoubleTextInputFormat
private final Pattern separator = Pattern.compile("[\t ]");
@Override
- public Vertex<LongWritable, DoubleWritable, DoubleWritable, ?>
+ public Vertex<LongWritable, DoubleWritable, DoubleWritable>
getCurrentVertex() throws IOException, InterruptedException {
- Vertex<LongWritable, DoubleWritable, DoubleWritable, ?>
+ Vertex<LongWritable, DoubleWritable, DoubleWritable>
vertex = conf.createVertex();
String[] tokens =
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullTextInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullTextInputFormat.java
index fdc9050..bfb5f40 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullTextInputFormat.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/LongDoubleNullTextInputFormat.java
@@ -28,7 +28,6 @@ import org.apache.giraph.io.formats.TextVertexInputFormat;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -42,10 +41,10 @@ import java.util.regex.Pattern;
public class LongDoubleNullTextInputFormat
extends TextVertexInputFormat<LongWritable, DoubleWritable, NullWritable>
implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
- NullWritable, Writable> {
+ NullWritable> {
/** Configuration. */
private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
- NullWritable, Writable> conf;
+ NullWritable> conf;
@Override
public TextVertexReader createVertexReader(InputSplit split,
@@ -56,13 +55,13 @@ public class LongDoubleNullTextInputFormat
@Override
public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
- DoubleWritable, NullWritable, Writable> configuration) {
+ DoubleWritable, NullWritable> configuration) {
this.conf = configuration;
}
@Override
public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
- NullWritable, Writable> getConf() {
+ NullWritable> getConf() {
return conf;
}
@@ -77,9 +76,9 @@ public class LongDoubleNullTextInputFormat
private final Pattern separator = Pattern.compile("[\t ]");
@Override
- public Vertex<LongWritable, DoubleWritable, NullWritable, ?>
+ public Vertex<LongWritable, DoubleWritable, NullWritable>
getCurrentVertex() throws IOException, InterruptedException {
- Vertex<LongWritable, DoubleWritable, NullWritable, ?>
+ Vertex<LongWritable, DoubleWritable, NullWritable>
vertex = conf.createVertex();
String[] tokens =
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleDoubleTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleDoubleTextInputFormat.java b/giraph-examples/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleDoubleTextInputFormat.java
index 7dc8475..5023a4e 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleDoubleTextInputFormat.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/NormalizingLongDoubleDoubleTextInputFormat.java
@@ -27,7 +27,6 @@ import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -45,10 +44,10 @@ public class NormalizingLongDoubleDoubleTextInputFormat
extends
TextVertexInputFormat<LongWritable, DoubleWritable, DoubleWritable>
implements ImmutableClassesGiraphConfigurable<LongWritable, DoubleWritable,
- DoubleWritable, Writable> {
+ DoubleWritable> {
/** Configuration. */
private ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
- DoubleWritable, Writable> conf;
+ DoubleWritable> conf;
@Override
public TextVertexReader createVertexReader(
@@ -58,13 +57,13 @@ public class NormalizingLongDoubleDoubleTextInputFormat
@Override
public void setConf(ImmutableClassesGiraphConfiguration<LongWritable,
- DoubleWritable, DoubleWritable, Writable> configuration) {
+ DoubleWritable, DoubleWritable> configuration) {
conf = configuration;
}
@Override
public ImmutableClassesGiraphConfiguration<LongWritable, DoubleWritable,
- DoubleWritable, Writable> getConf() {
+ DoubleWritable> getConf() {
return conf;
}
@@ -81,10 +80,10 @@ public class NormalizingLongDoubleDoubleTextInputFormat
@Override
public Vertex<LongWritable, DoubleWritable,
- DoubleWritable, ?> getCurrentVertex()
+ DoubleWritable> getCurrentVertex()
throws IOException, InterruptedException {
- Vertex<LongWritable, DoubleWritable,
- DoubleWritable, ?> vertex = conf.createVertex();
+ Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex =
+ conf.createVertex();
String[] tokens = edgeSeparator.split(getRecordReader()
.getCurrentValue().toString());
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankComputation.java
new file mode 100644
index 0000000..9ac90d9
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankComputation.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.MathUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * The PageRank algorithm, with uniform transition probabilities on the edges
+ * http://en.wikipedia.org/wiki/PageRank
+ */
+public class PageRankComputation extends RandomWalkComputation<NullWritable> {
+ @Override
+ protected double transitionProbability(
+ Vertex<LongWritable, DoubleWritable, NullWritable> vertex,
+ double stateProbability, Edge<LongWritable, NullWritable> edge) {
+ return stateProbability / vertex.getNumEdges();
+ }
+
+ @Override
+ protected double recompute(
+ Vertex<LongWritable, DoubleWritable, NullWritable> vertex,
+ Iterable<DoubleWritable> partialRanks, double teleportationProbability) {
+ // rank contribution from incident neighbors
+ double rankFromNeighbors = MathUtils.sum(partialRanks);
+ // rank contribution from dangling vertices
+ double danglingContribution =
+ getDanglingProbability() / getTotalNumVertices();
+
+ // recompute rank
+ return (1d - teleportationProbability) *
+ (rankFromNeighbors + danglingContribution) +
+ teleportationProbability / getTotalNumVertices();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
deleted file mode 100644
index 9678b31..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankVertex.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.utils.MathUtils;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-
-/**
- * The PageRank algorithm, with uniform transition probabilities on the edges
- * http://en.wikipedia.org/wiki/PageRank
- */
-public class PageRankVertex extends RandomWalkVertex<NullWritable> {
-
- @Override
- protected double transitionProbability(double stateProbability,
- Edge<LongWritable, NullWritable> edge) {
- return stateProbability / getNumEdges();
- }
-
- @Override
- protected double recompute(Iterable<DoubleWritable> partialRanks,
- double teleportationProbability) {
-
- // rank contribution from incident neighbors
- double rankFromNeighbors = MathUtils.sum(partialRanks);
- // rank contribution from dangling vertices
- double danglingContribution =
- getDanglingProbability() / getTotalNumVertices();
-
- // recompute rank
- return (1d - teleportationProbability) *
- (rankFromNeighbors + danglingContribution) +
- teleportationProbability / getTotalNumVertices();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java
deleted file mode 100644
index f617d8e..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.partition.DefaultPartitionContext;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.worker.DefaultWorkerContext;
-import org.apache.giraph.worker.WorkerContext;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.LongWritable;
-
-import java.io.IOException;
-
-/**
- * Vertex to test the functionality of PartitionContext
- */
-public class PartitionContextTestVertex extends
- Vertex<LongWritable, DoubleWritable, FloatWritable,
- DoubleWritable> {
- /** How many compute threads to use in the test */
- public static final int NUM_COMPUTE_THREADS = 10;
- /** How many vertices to create for the test */
- public static final int NUM_VERTICES = 100;
- /** How many partitions to have */
- public static final int NUM_PARTITIONS = 25;
-
- @Override
- public void compute(Iterable<DoubleWritable> messages) throws IOException {
- TestPartitionContextPartitionContext partitionContext =
- (TestPartitionContextPartitionContext) getPartitionContext();
- partitionContext.counter++;
- if (getSuperstep() > 5) {
- voteToHalt();
- }
- }
-
- /**
- * PartitionContext for TestPartitionContext
- */
- public static class TestPartitionContextPartitionContext extends
- DefaultPartitionContext {
- /**
- * The counter should hold the number of vertices in this partition,
- * plus the current superstep
- */
- private long counter;
-
- @Override
- public void preSuperstep(WorkerContext workerContext) {
- counter =
- ((TestPartitionContextWorkerContext) workerContext).superstepCounter;
- }
-
- @Override
- public void postSuperstep(WorkerContext workerContext) {
- ((TestPartitionContextWorkerContext) workerContext).totalCounter +=
- counter;
- }
- }
-
- /**
- * WorkerContext for TestPartitionContext
- */
- public static class TestPartitionContextWorkerContext extends
- DefaultWorkerContext {
- /** Current superstep */
- private long superstepCounter;
- /**
- * This counter should hold the sum of PartitionContext's counters
- */
- private long totalCounter;
-
- @Override
- public void preSuperstep() {
- superstepCounter = getSuperstep();
- totalCounter = 0;
- }
-
- @Override
- public void postSuperstep() {
- assertEquals(totalCounter,
- NUM_PARTITIONS * superstepCounter + getTotalNumVertices());
- }
- }
-
- /**
- * Throws exception if values are not equal.
- *
- * @param expected Expected value
- * @param actual Actual value
- */
- private static void assertEquals(long expected, long actual) {
- if (expected != actual) {
- throw new RuntimeException("expected: " + expected +
- ", actual: " + actual);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkComputation.java
new file mode 100644
index 0000000..ed95aae
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkComputation.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+
+/**
+ * Base class for executing a random walk on a graph
+ *
+ * @param <E> edge type
+ */
+public abstract class RandomWalkComputation<E extends Writable>
+ extends BasicComputation<LongWritable, DoubleWritable, E, DoubleWritable> {
+ /** Configuration parameter for the number of supersteps to execute */
+ static final String MAX_SUPERSTEPS = RandomWalkComputation.class.getName() +
+ ".maxSupersteps";
+ /** Configuration parameter for the teleportation probability */
+ static final String TELEPORTATION_PROBABILITY = RandomWalkComputation.class
+ .getName() + ".teleportationProbability";
+ /** Name of aggregator for the probability of dangling vertices */
+ static final String CUMULATIVE_DANGLING_PROBABILITY =
+ RandomWalkComputation.class.getName() + ".cumulativeDanglingProbability";
+ /** Name of aggregator for the probability of all vertices */
+ static final String CUMULATIVE_PROBABILITY = RandomWalkComputation.class
+ .getName() + ".cumulativeProbability";
+ /** Name of aggregator for the probability of dangling vertices */
+ static final String NUM_DANGLING_VERTICES = RandomWalkComputation.class
+ .getName() + ".numDanglingVertices";
+ /** Name of aggregator for the L1 norm of the probability difference, used
+ * for covergence detection */
+ static final String L1_NORM_OF_PROBABILITY_DIFFERENCE =
+ RandomWalkComputation.class.getName() + ".l1NormOfProbabilityDifference";
+ /** Reusable {@link DoubleWritable} instance to avoid object instantiation */
+ private final DoubleWritable doubleWritable = new DoubleWritable();
+ /** Reusable {@link LongWritable} for counting dangling vertices */
+ private final LongWritable one = new LongWritable(1);
+
+ /**
+ * Compute an initial probability value for the vertex. Per default,
+ * we start with a uniform distribution.
+ * @return The initial probability value.
+ */
+ protected double initialProbability() {
+ return 1.0 / getTotalNumVertices();
+ }
+
+ /**
+ * Compute the probability of transitioning to a neighbor vertex
+ * @param vertex Vertex
+ * @param stateProbability current steady state probability of the vertex
+ * @param edge edge to neighbor
+ * @return the probability of transitioning to a neighbor vertex
+ */
+ protected abstract double transitionProbability(
+ Vertex<LongWritable, DoubleWritable, E> vertex,
+ double stateProbability,
+ Edge<LongWritable, E> edge);
+
+ /**
+ * Perform a single step of a random walk computation.
+ * @param vertex Vertex
+ * @param messages Messages received in the previous step.
+ * @param teleportationProbability Probability of teleporting to another
+ * vertex.
+ * @return The new probability distribution value.
+ */
+ protected abstract double recompute(
+ Vertex<LongWritable, DoubleWritable, E> vertex,
+ Iterable<DoubleWritable> messages,
+ double teleportationProbability);
+
+ /**
+ * Returns the cumulative probability from dangling vertices.
+ * @return The cumulative probability from dangling vertices.
+ */
+ protected double getDanglingProbability() {
+ return this.<DoubleWritable>getAggregatedValue(
+ RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY).get();
+ }
+
+ /**
+ * Returns the cumulative probability from dangling vertices.
+ * @return The cumulative probability from dangling vertices.
+ */
+ protected double getPreviousCumulativeProbability() {
+ return this.<DoubleWritable>getAggregatedValue(
+ RandomWalkComputation.CUMULATIVE_PROBABILITY).get();
+ }
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, DoubleWritable, E> vertex,
+ Iterable<DoubleWritable> messages) throws IOException {
+ double stateProbability;
+
+ if (getSuperstep() > 0) {
+
+ double previousStateProbability = vertex.getValue().get();
+ stateProbability =
+ recompute(vertex, messages, teleportationProbability());
+
+ // Important: rescale for numerical stability
+ stateProbability /= getPreviousCumulativeProbability();
+
+ doubleWritable.set(Math.abs(stateProbability - previousStateProbability));
+ aggregate(L1_NORM_OF_PROBABILITY_DIFFERENCE, doubleWritable);
+
+ } else {
+ stateProbability = initialProbability();
+ }
+
+ vertex.getValue().set(stateProbability);
+
+ aggregate(CUMULATIVE_PROBABILITY, vertex.getValue());
+
+ // Compute dangling node contribution for next superstep
+ if (vertex.getNumEdges() == 0) {
+ aggregate(NUM_DANGLING_VERTICES, one);
+ aggregate(CUMULATIVE_DANGLING_PROBABILITY, vertex.getValue());
+ }
+
+ if (getSuperstep() < maxSupersteps()) {
+ for (Edge<LongWritable, E> edge : vertex.getEdges()) {
+ double transitionProbability =
+ transitionProbability(vertex, stateProbability, edge);
+ doubleWritable.set(transitionProbability);
+ sendMessage(edge.getTargetVertexId(), doubleWritable);
+ }
+ } else {
+ vertex.voteToHalt();
+ }
+ }
+
+ /**
+ * Reads the number of supersteps to execute from the configuration
+ * @return number of supersteps to execute
+ */
+ private int maxSupersteps() {
+ return ((RandomWalkWorkerContext) getWorkerContext()).getMaxSupersteps();
+ }
+
+ /**
+ * Reads the teleportation probability from the configuration
+ * @return teleportation probability
+ */
+ protected double teleportationProbability() {
+ return ((RandomWalkWorkerContext) getWorkerContext())
+ .getTeleportationProbability();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
deleted file mode 100644
index 2d2c988..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertex.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import java.io.IOException;
-
-/**
- * Base class for executing a random walk on a graph
- *
- * @param <E> edge type
- */
-public abstract class RandomWalkVertex<E extends Writable>
- extends Vertex<LongWritable, DoubleWritable, E, DoubleWritable> {
- /** Configuration parameter for the number of supersteps to execute */
- static final String MAX_SUPERSTEPS = RandomWalkVertex.class.getName() +
- ".maxSupersteps";
- /** Configuration parameter for the teleportation probability */
- static final String TELEPORTATION_PROBABILITY = RandomWalkVertex.class
- .getName() + ".teleportationProbability";
- /** Name of aggregator for the probability of dangling vertices */
- static final String CUMULATIVE_DANGLING_PROBABILITY = RandomWalkVertex.class
- .getName() + ".cumulativeDanglingProbability";
- /** Name of aggregator for the probability of all vertices */
- static final String CUMULATIVE_PROBABILITY = RandomWalkVertex.class
- .getName() + ".cumulativeProbability";
- /** Name of aggregator for the probability of dangling vertices */
- static final String NUM_DANGLING_VERTICES = RandomWalkVertex.class
- .getName() + ".numDanglingVertices";
- /** Name of aggregator for the L1 norm of the probability difference, used
- * for covergence detection */
- static final String L1_NORM_OF_PROBABILITY_DIFFERENCE = RandomWalkVertex.class
- .getName() + ".l1NormOfProbabilityDifference";
- /** Reusable {@link DoubleWritable} instance to avoid object instantiation */
- private final DoubleWritable doubleWritable = new DoubleWritable();
- /** Reusable {@link LongWritable} for counting dangling vertices */
- private final LongWritable one = new LongWritable(1);
-
- /**
- * Compute an initial probability value for the vertex. Per default,
- * we start with a uniform distribution.
- * @return The initial probability value.
- */
- protected double initialProbability() {
- return 1.0 / getTotalNumVertices();
- }
-
- /**
- * Compute the probability of transitioning to a neighbor vertex
- * @param stateProbability current steady state probability of the vertex
- * @param edge edge to neighbor
- * @return the probability of transitioning to a neighbor vertex
- */
- protected abstract double transitionProbability(double stateProbability,
- Edge<LongWritable, E> edge);
-
- /**
- * Perform a single step of a random walk computation.
- * @param messages Messages received in the previous step.
- * @param teleportationProbability Probability of teleporting to another
- * vertex.
- * @return The new probability distribution value.
- */
- protected abstract double recompute(Iterable<DoubleWritable> messages,
- double teleportationProbability);
-
- /**
- * Returns the cumulative probability from dangling vertices.
- * @return The cumulative probability from dangling vertices.
- */
- protected double getDanglingProbability() {
- return this.<DoubleWritable>getAggregatedValue(
- RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get();
- }
-
- /**
- * Returns the cumulative probability from dangling vertices.
- * @return The cumulative probability from dangling vertices.
- */
- protected double getPreviousCumulativeProbability() {
- return this.<DoubleWritable>getAggregatedValue(
- RandomWalkVertex.CUMULATIVE_PROBABILITY).get();
- }
-
- @Override
- public void compute(Iterable<DoubleWritable> messages) throws IOException {
- double stateProbability;
-
- if (getSuperstep() > 0) {
-
- double previousStateProbability = getValue().get();
- stateProbability = recompute(messages, teleportationProbability());
-
- // Important: rescale for numerical stability
- stateProbability /= getPreviousCumulativeProbability();
-
- doubleWritable.set(Math.abs(stateProbability - previousStateProbability));
- aggregate(L1_NORM_OF_PROBABILITY_DIFFERENCE, doubleWritable);
-
- } else {
- stateProbability = initialProbability();
- }
-
- getValue().set(stateProbability);
-
- aggregate(CUMULATIVE_PROBABILITY, getValue());
-
- // Compute dangling node contribution for next superstep
- if (getNumEdges() == 0) {
- aggregate(NUM_DANGLING_VERTICES, one);
- aggregate(CUMULATIVE_DANGLING_PROBABILITY, getValue());
- }
-
- if (getSuperstep() < maxSupersteps()) {
- for (Edge<LongWritable, E> edge : getEdges()) {
- double transitionProbability =
- transitionProbability(stateProbability, edge);
- doubleWritable.set(transitionProbability);
- sendMessage(edge.getTargetVertexId(), doubleWritable);
- }
- } else {
- voteToHalt();
- }
- }
-
- /**
- * Reads the number of supersteps to execute from the configuration
- * @return number of supersteps to execute
- */
- private int maxSupersteps() {
- return ((RandomWalkWorkerContext) getWorkerContext()).getMaxSupersteps();
- }
-
- /**
- * Reads the teleportation probability from the configuration
- * @return teleportation probability
- */
- protected double teleportationProbability() {
- return ((RandomWalkWorkerContext) getWorkerContext())
- .getTeleportationProbability();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java
index 9e5dbbf..8b5f23b 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkVertexMasterCompute.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.log4j.Logger;
/**
- * Master compute associated with {@link RandomWalkVertex}. It handles
+ * Master compute associated with {@link RandomWalkComputation}. It handles
* dangling nodes.
*/
public class RandomWalkVertexMasterCompute extends DefaultMasterCompute {
@@ -42,16 +42,16 @@ public class RandomWalkVertexMasterCompute extends DefaultMasterCompute {
public void compute() {
double danglingContribution =
this.<DoubleWritable>getAggregatedValue(
- RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY).get();
+ RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY).get();
double cumulativeProbability =
this.<DoubleWritable>getAggregatedValue(
- RandomWalkVertex.CUMULATIVE_PROBABILITY).get();
+ RandomWalkComputation.CUMULATIVE_PROBABILITY).get();
double l1NormOfStateDiff =
this.<DoubleWritable>getAggregatedValue(
- RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE).get();
+ RandomWalkComputation.L1_NORM_OF_PROBABILITY_DIFFERENCE).get();
long numDanglingVertices =
this.<LongWritable>getAggregatedValue(
- RandomWalkVertex.NUM_DANGLING_VERTICES).get();
+ RandomWalkComputation.NUM_DANGLING_VERTICES).get();
LOG.info("[Superstep " + getSuperstep() + "] Dangling contribution = " +
danglingContribution + ", number of dangling vertices = " +
@@ -69,13 +69,13 @@ public class RandomWalkVertexMasterCompute extends DefaultMasterCompute {
@Override
public void initialize() throws InstantiationException,
IllegalAccessException {
- registerAggregator(RandomWalkVertex.NUM_DANGLING_VERTICES,
+ registerAggregator(RandomWalkComputation.NUM_DANGLING_VERTICES,
LongSumAggregator.class);
- registerAggregator(RandomWalkVertex.CUMULATIVE_DANGLING_PROBABILITY,
+ registerAggregator(RandomWalkComputation.CUMULATIVE_DANGLING_PROBABILITY,
DoubleSumAggregator.class);
- registerAggregator(RandomWalkVertex.CUMULATIVE_PROBABILITY,
+ registerAggregator(RandomWalkComputation.CUMULATIVE_PROBABILITY,
DoubleSumAggregator.class);
- registerAggregator(RandomWalkVertex.L1_NORM_OF_PROBABILITY_DIFFERENCE,
+ registerAggregator(RandomWalkComputation.L1_NORM_OF_PROBABILITY_DIFFERENCE,
DoubleSumAggregator.class);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartComputation.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartComputation.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartComputation.java
new file mode 100644
index 0000000..94e5d60
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartComputation.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.base.Preconditions;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.utils.MathUtils;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Executes "RandomWalkWithRestart", a random walk on the graph which is biased
+ * towards a source vertex. The resulting probabilities of staying at a given
+ * vertex can be interpreted as a measure of proximity to the source vertex.
+ */
+public class RandomWalkWithRestartComputation
+ extends RandomWalkComputation<DoubleWritable> {
+
+ /** Configuration parameter for the source vertex */
+ static final String SOURCE_VERTEX = RandomWalkWithRestartComputation.class
+ .getName() + ".sourceVertex";
+
+ /**
+ * Checks whether the currently executed vertex is the source vertex
+ * @param vertex Vertex
+ * @return is the currently executed vertex the source vertex?
+ */
+ private boolean isSourceVertex(Vertex<LongWritable, ?, ?> vertex) {
+ return ((RandomWalkWorkerContext) getWorkerContext()).isSource(
+ vertex.getId().get());
+ }
+
+ /**
+ * Returns the number of source vertices.
+ * @return The number of source vertices.
+ */
+ private int numSourceVertices() {
+ return ((RandomWalkWorkerContext) getWorkerContext()).numSources();
+ }
+
+ @Override
+ protected double transitionProbability(
+ Vertex<LongWritable, DoubleWritable, DoubleWritable>
+ vertex,
+ double stateProbability, Edge<LongWritable, DoubleWritable> edge) {
+ return stateProbability * edge.getValue().get();
+ }
+
+ @Override
+ protected double recompute(
+ Vertex<LongWritable, DoubleWritable, DoubleWritable> vertex,
+ Iterable<DoubleWritable> transitionProbabilities,
+ double teleportationProbability) {
+ int numSourceVertices = numSourceVertices();
+ Preconditions.checkState(numSourceVertices > 0, "No source vertex found");
+
+ double stateProbability = MathUtils.sum(transitionProbabilities);
+ // Add the contribution of dangling nodes (weakly preferential
+ // implementation: dangling nodes redistribute uniformly)
+ stateProbability += getDanglingProbability() / getTotalNumVertices();
+ // The random walk might teleport back to one of the source vertexes
+ stateProbability *= 1 - teleportationProbability;
+ if (isSourceVertex(vertex)) {
+ stateProbability += teleportationProbability / numSourceVertices;
+ }
+ return stateProbability;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
deleted file mode 100644
index 6f3eb6c..0000000
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWithRestartVertex.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.examples;
-
-import com.google.common.base.Preconditions;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.utils.MathUtils;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-
-/**
- * Executes "RandomWalkWithRestart", a random walk on the graph which is biased
- * towards a source vertex. The resulting probabilities of staying at a given
- * vertex can be interpreted as a measure of proximity to the source vertex.
- */
-public class RandomWalkWithRestartVertex
- extends RandomWalkVertex<DoubleWritable> {
-
- /** Configuration parameter for the source vertex */
- static final String SOURCE_VERTEX = RandomWalkWithRestartVertex.class
- .getName() + ".sourceVertex";
-
- /**
- * Checks whether the currently executed vertex is the source vertex
- * @return is the currently executed vertex the source vertex?
- */
- private boolean isSourceVertex() {
- return ((RandomWalkWorkerContext) getWorkerContext()).isSource(getId()
- .get());
- }
-
- /**
- * Returns the number of source vertices.
- * @return The number of source vertices.
- */
- private int numSourceVertices() {
- return ((RandomWalkWorkerContext) getWorkerContext()).numSources();
- }
-
- @Override
- protected double transitionProbability(double stateProbability,
- Edge<LongWritable, DoubleWritable> edge) {
- return stateProbability * edge.getValue().get();
- }
-
- @Override
- protected double recompute(Iterable<DoubleWritable> transitionProbabilities,
- double teleportationProbability) {
-
- int numSourceVertices = numSourceVertices();
- Preconditions.checkState(numSourceVertices > 0, "No source vertex found");
-
- double stateProbability = MathUtils.sum(transitionProbabilities);
- // Add the contribution of dangling nodes (weakly preferential
- // implementation: dangling nodes redistribute uniformly)
- stateProbability += getDanglingProbability() / getTotalNumVertices();
- // The random walk might teleport back to one of the source vertexes
- stateProbability *= 1 - teleportationProbability;
- if (isSourceVertex()) {
- stateProbability += teleportationProbability / numSourceVertices;
- }
- return stateProbability;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8811165e/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
index 2566f43..5c23b5a 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/RandomWalkWorkerContext.java
@@ -48,8 +48,8 @@ public class RandomWalkWorkerContext extends WorkerContext {
private static Set<Long> SOURCES;
/** Configuration parameter for the source vertex */
- private static final String SOURCE_VERTEX = RandomWalkWithRestartVertex.class
- .getName() + ".sourceVertex";
+ private static final String SOURCE_VERTEX =
+ RandomWalkWithRestartComputation.class.getName() + ".sourceVertex";
/** Logger */
private static final Logger LOG = Logger
@@ -143,10 +143,10 @@ public class RandomWalkWorkerContext extends WorkerContext {
public void preApplication() throws InstantiationException,
IllegalAccessException {
Configuration configuration = getContext().getConfiguration();
- MAX_SUPERSTEPS = configuration.getInt(RandomWalkVertex.MAX_SUPERSTEPS,
+ MAX_SUPERSTEPS = configuration.getInt(RandomWalkComputation.MAX_SUPERSTEPS,
DEFAULT_MAX_SUPERSTEPS);
TELEPORTATION_PROBABILITY = configuration.getFloat(
- RandomWalkVertex.TELEPORTATION_PROBABILITY,
+ RandomWalkComputation.TELEPORTATION_PROBABILITY,
DEFAULT_TELEPORTATION_PROBABILITY);
SOURCES = initializeSources(configuration);
}