You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2017/06/02 17:14:31 UTC
git commit: updated refs/heads/trunk to e2f82b2
Repository: giraph
Updated Branches:
refs/heads/trunk 02e73b917 -> e2f82b25d
JIRA-1148
closes #39
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/e2f82b25
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/e2f82b25
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/e2f82b25
Branch: refs/heads/trunk
Commit: e2f82b25d19faa263bbdbdf074948c6537d85e3a
Parents: 02e73b9
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri Jun 2 10:14:18 2017 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Fri Jun 2 10:14:18 2017 -0700
----------------------------------------------------------------------
.../UndirectedConnectedComponents.java | 11 ++-
.../apache/giraph/block_app/library/Pieces.java | 90 ++++++++++++++++++++
2 files changed, 98 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/e2f82b25/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
index fb04fa8..4398c06 100644
--- a/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
+++ b/giraph-block-app-8/src/main/java/org/apache/giraph/block_app/library/prepare_graph/UndirectedConnectedComponents.java
@@ -352,10 +352,15 @@ public class UndirectedConnectedComponents {
Pair<LongWritable, LongWritable> componentToReducePair = Pair.of(
new LongWritable(), new LongWritable(1));
LongWritable reusableLong = new LongWritable();
- return Pieces.reduceAndBroadcast(
- "CalcConnectedComponentSizes",
+ // This reduce operation is stateless so we can use a single instance
+ BasicMapReduce<LongWritable, LongWritable, LongWritable> reduceOperation =
new BasicMapReduce<>(
- LongTypeOps.INSTANCE, LongTypeOps.INSTANCE, SumReduce.LONG),
+ LongTypeOps.INSTANCE, LongTypeOps.INSTANCE, SumReduce.LONG);
+ return Pieces.reduceAndBroadcastWithArrayOfHandles(
+ "CalcConnectedComponentSizes",
+ 3137, /* Just using some large prime number */
+ () -> reduceOperation,
+ vertex -> getComponent.get(vertex).get(),
(Vertex<LongWritable, V, Writable> vertex) -> {
componentToReducePair.getLeft().set(getComponent.get(vertex).get());
return componentToReducePair;
http://git-wip-us.apache.org/repos/asf/giraph/blob/e2f82b25/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
index 614f4ba..587ae65 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/library/Pieces.java
@@ -17,7 +17,9 @@
*/
package org.apache.giraph.block_app.library;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import org.apache.giraph.block_app.framework.api.BlockMasterApi;
import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi;
@@ -26,13 +28,16 @@ import org.apache.giraph.block_app.framework.api.CreateReducersApi;
import org.apache.giraph.block_app.framework.piece.Piece;
import org.apache.giraph.block_app.framework.piece.global_comm.ReducerAndBroadcastWrapperHandle;
import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle;
+import org.apache.giraph.block_app.framework.piece.global_comm.array.BroadcastArrayHandle;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver;
import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
import org.apache.giraph.block_app.library.internal.SendMessagePiece;
import org.apache.giraph.block_app.library.internal.SendMessageWithCombinerPiece;
+import org.apache.giraph.block_app.reducers.array.ArrayOfHandles;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.function.Consumer;
import org.apache.giraph.function.PairConsumer;
+import org.apache.giraph.function.Supplier;
import org.apache.giraph.function.vertex.ConsumerWithVertex;
import org.apache.giraph.function.vertex.SupplierFromVertex;
import org.apache.giraph.graph.Vertex;
@@ -320,6 +325,91 @@ public class Pieces {
}
/**
+ * Like reduceAndBroadcast, but uses array of handles for reducers and
+ * broadcasts, to make it feasible and performant when values are large.
+ * Each supplied value to reduce will be reduced in the handle defined by
+ * handleHashSupplier%numHandles
+ *
+ * @param <S> Single value type, objects passed on workers
+ * @param <R> Reduced value type
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+ public static
+ <S, R extends Writable, I extends WritableComparable, V extends Writable,
+ E extends Writable>
+ Piece<I, V, E, NoMessage, Object> reduceAndBroadcastWithArrayOfHandles(
+ final String name,
+ final int numHandles,
+ final Supplier<ReduceOperation<S, R>> reduceOp,
+ final SupplierFromVertex<I, V, E, Long> handleHashSupplier,
+ final SupplierFromVertex<I, V, E, S> valueSupplier,
+ final ConsumerWithVertex<I, V, E, R> reducedValueConsumer) {
+ return new Piece<I, V, E, NoMessage, Object>() {
+ protected ArrayOfHandles.ArrayOfReducers<S, R> reducers;
+ protected BroadcastArrayHandle<R> broadcasts;
+
+ private int getHandleIndex(Vertex<I, V, E> vertex) {
+ return (int) Math.abs(handleHashSupplier.get(vertex) % numHandles);
+ }
+
+ @Override
+ public void registerReducers(
+ final CreateReducersApi reduceApi, Object executionStage) {
+ reducers = new ArrayOfHandles.ArrayOfReducers<>(
+ numHandles,
+ new Supplier<ReducerHandle<S, R>>() {
+ @Override
+ public ReducerHandle<S, R> get() {
+ return reduceApi.createLocalReducer(reduceOp.get());
+ }
+ });
+ }
+
+ @Override
+ public VertexSender<I, V, E> getVertexSender(
+ BlockWorkerSendApi<I, V, E, NoMessage> workerApi,
+ Object executionStage) {
+ return new InnerVertexSender() {
+ @Override
+ public void vertexSend(Vertex<I, V, E> vertex) {
+ reducers.get(getHandleIndex(vertex)).reduce(
+ valueSupplier.get(vertex));
+ }
+ };
+ }
+
+ @Override
+ public void masterCompute(BlockMasterApi master, Object executionStage) {
+ broadcasts = reducers.broadcastValue(master);
+ }
+
+ @Override
+ public VertexReceiver<I, V, E, NoMessage> getVertexReceiver(
+ BlockWorkerReceiveApi<I> workerApi, Object executionStage) {
+ final List<R> values = new ArrayList<>();
+ for (int i = 0; i < numHandles; i++) {
+ values.add(broadcasts.get(i).getBroadcast(workerApi));
+ }
+ return new InnerVertexReceiver() {
+ @Override
+ public void vertexReceive(
+ Vertex<I, V, E> vertex, Iterable<NoMessage> messages) {
+ reducedValueConsumer.apply(
+ vertex, values.get(getHandleIndex(vertex)));
+ }
+ };
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ };
+ }
+
+ /**
* Creates Piece that for each vertex, sends message provided by
* messageSupplier to all targets provided by targetsSupplier.
* Received messages are then passed to and processed by provided